// Copyright 2014 The mqrouter Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package xmq

import (
	"context"
	"errors"
	"fmt"
	"strconv"
	"strings"
	"sync"

	"gitee.com/xfrm/middleware/xmq/pulsar"

	"gitee.com/xfrm/middleware/xmq/pub"

	"gitee.com/xfrm/middleware/xmq/delay"
	"gitee.com/xfrm/middleware/xmq/kafka"

	. "gitee.com/xfrm/middleware/internal/middlewareconf"
	"gitee.com/xfrm/middleware/xconfig"
	"gitee.com/xfrm/middleware/xlog"
)

var defaultInstanceManager = NewInstanceManager()

type MQRoleType int

const (
	RoleTypeKafkaReader MQRoleType = iota
	RoleTypeKafkaWriter
	RoleTypeDelayClient
	RoleTypePulsarConsumer
	RoleTypePulsarProducer

	instanceConfSep = "@"
)

var (
	InvalidMqRoleTypeStringErr = errors.New("invalid mq role type string")
)

func (t MQRoleType) String() string {
	switch t {
	case RoleTypeKafkaReader:
		return "reader"
	case RoleTypeKafkaWriter:
		return "writer"
	case RoleTypeDelayClient:
		return "delay"
	case RoleTypePulsarConsumer:
		return "pulsar_consumer"
	case RoleTypePulsarProducer:
		return "pulsar_producer"

	}
	// unreachable
	return ""
}

func MQRoleTypeFromInt(it int) (t MQRoleType, err error) {
	switch it {
	case 0:
		t = RoleTypeKafkaReader
	case 1:
		t = RoleTypeKafkaWriter
	case 2:
		t = RoleTypeDelayClient
	case 3:
		t = RoleTypePulsarConsumer
	case 4:
		t = RoleTypePulsarProducer
	default:
		err = InvalidMqRoleTypeStringErr
	}
	return
}

type instanceConf struct {
	group     string
	role      MQRoleType
	topic     string
	groupId   string
	partition int
}

func (c *instanceConf) String() string {
	return fmt.Sprintf("%s@%d@%s@%s@%d",
		c.group, c.role, c.topic, c.groupId, c.partition)
}

func instanceConfFromString(s string) (conf *instanceConf, err error) {
	items := strings.Split(s, instanceConfSep)
	length := len(items)
	if length < 5 {
		return nil, fmt.Errorf("invalid instance conf string:%s", s)
	}

	// group-role-topic-groupID-partition: 为了兼容topic中有-的情况
	topic := strings.Join(items[2:length-2], instanceConfSep)

	conf = &instanceConf{
		group:   items[0],
		topic:   topic,
		groupId: items[length-2],
	}

	it, err := strconv.Atoi(items[1])
	if err != nil {
		return nil, err
	}
	conf.role, err = MQRoleTypeFromInt(it)
	if err != nil {
		return nil, err
	}

	conf.partition, err = strconv.Atoi(items[length-1])
	if err != nil {
		return nil, err
	}
	return
}

type InstanceManager struct {
	instances sync.Map
	mutex     sync.Mutex
}

func NewInstanceManager() *InstanceManager {
	im := &InstanceManager{}
	im.watch(context.Background())
	return im
}

func (m *InstanceManager) buildKey(conf *instanceConf) string {
	return conf.String()
}

func (m *InstanceManager) confFromKey(key string) (*instanceConf, error) {
	return instanceConfFromString(key)
}

func (m *InstanceManager) add(conf *instanceConf, in interface{}) {
	m.instances.Store(conf.String(), in)
}

func (m *InstanceManager) newInstance(ctx context.Context, conf *instanceConf) (interface{}, error) {

	switch conf.role {
	case RoleTypeKafkaReader:
		if len(conf.groupId) > 0 {
			return kafka.NewGroupReader(ctx, conf.topic, conf.groupId)
		} else {
			return kafka.NewPartitionReader(ctx, conf.topic, conf.partition)
		}

	case RoleTypeKafkaWriter:
		return kafka.NewWriter(ctx, conf.topic)

	case RoleTypeDelayClient:
		return delay.NewDefaultDelayClient(ctx, conf.topic)

	case RoleTypePulsarProducer:
		return pulsar.NewProducer(ctx, conf.topic)

	case RoleTypePulsarConsumer:
		return pulsar.NewConsumer(ctx, conf.topic, conf.groupId)

	default:
		return nil, fmt.Errorf("role %d error", conf.role)
	}
}

func (m *InstanceManager) get(ctx context.Context, conf *instanceConf) interface{} {
	fun := "InstanceManager.get -->"

	var err error
	var in interface{}
	key := m.buildKey(conf)
	in, ok := m.instances.Load(key)
	if ok == false {

		m.mutex.Lock()

		in, ok = m.instances.Load(key)
		if ok {
			m.mutex.Unlock()
			return in
		}

		xlog.Infof(ctx, "%s newInstance, role:%v, topic: %s", fun, conf.role, conf.topic)
		in, err = m.newInstance(ctx, conf)
		if err != nil {
			xlog.Errorf(ctx, "%s NewInstance err, topic: %s, err: %s", fun, conf.topic, err.Error())
			m.mutex.Unlock()
			return nil
		}

		in, _ = m.instances.LoadOrStore(key, in)

		m.mutex.Unlock()
	}
	return in
}

func (m *InstanceManager) applyChange(ctx context.Context, k string, change *xconfig.Change) {
	fun := "InstanceManager.applyChange-->"
	xlog.Infof(ctx, "%s apply change:%v to key:%s", fun, change, k)
	m.instances.Range(func(key, val interface{}) (ret bool) {
		ret = true

		sk, ok := key.(string)
		if !ok {
			xlog.Errorf(ctx, "%s key:%v should be string", fun, key)
			return
		}

		conf, err := m.confFromKey(sk)
		if err != nil {
			xlog.Errorf(ctx, "%s failed to convert key:%s to conf", fun, sk)
			return
		}

		middlewareConfig, err := GetMiddlewareConfig()
		if err != nil {
			xlog.Errorf(ctx, "%s middlewareconf err: %v", fun, err)
			return
		}

		keyParts, err := middlewareConfig.ParseMQKey(ctx, k)
		if err != nil {
			xlog.Errorf(ctx, "%s parse key:%s failed err:%v", fun, k, err)
			return
		}

		// NOTE: 只要 group 和 topic 相同，即认为相关的配置发生了变化
		//       为了逻辑简单，不论什么变化，都重新载入一次 instance, 不对不同的 ChangeType 单独处理
		if (keyParts.Group == conf.group || keyParts.Group == pub.DefaultRouteGroup) && keyParts.Topic == conf.topic {
			xlog.Infof(ctx, "%s update instance:%v", fun, val)
			// NOTE: 关闭旧实例，重新载入新实例，若旧实例关闭失败打印日志
			if err = m.closeInstance(ctx, val, conf); err != nil {
				xlog.Errorf(ctx, "%s close instance err:%v", fun, err)
			}

			in, err := m.newInstance(ctx, conf)
			if err != nil {
				m.instances.Delete(key)
				return
			}
			m.instances.Store(key, in)
		}
		return
	})
}

func (m *InstanceManager) applyChangeEvent(ctx context.Context, ce *xconfig.ChangeEvent) {
	xlog.Infof(ctx, "got new change event:%v", ce)

	for key, change := range ce.Changes {
		// NOTE: 只需关心 MODIFY 与 DELETE 类型改变
		if change.ChangeType != xconfig.MODIFY && change.ChangeType != xconfig.DELETE {
			continue
		}

		m.applyChange(ctx, key, change)
	}
}

func (m *InstanceManager) watch(ctx context.Context) {
	middlewareConfig, err := GetMiddlewareConfig()
	if err != nil {
		xlog.Errorf(ctx, "watch err: %v", err)
		return
	}
	middlewareConfig.RegisterObserver(ctx, m.applyChangeEvent)
}

func (m *InstanceManager) getKafkaReader(ctx context.Context, conf *instanceConf) *kafka.KafkaReader {
	fun := "InstanceManager.getKafkaReader -->"

	in := m.get(ctx, conf)
	if in == nil {
		return nil
	}

	reader, ok := in.(*kafka.KafkaReader)
	if ok == false {
		xlog.Errorf(ctx, "%s in.(Reader) err, topic: %s", fun, conf.topic)
		return nil
	}

	return reader
}

func (m *InstanceManager) getDelayClient(ctx context.Context, conf *instanceConf) *delay.DelayClient {
	fun := "InstanceManager.getDelayClient"

	in := m.get(ctx, conf)
	if in == nil {
		return nil
	}

	client, ok := in.(*delay.DelayClient)
	if ok == false {
		xlog.Errorf(ctx, "%s in.(Reader) err, topic: %s", fun, conf.topic)
		return nil
	}
	return client
}

func (m *InstanceManager) getKafkaWriter(ctx context.Context, conf *instanceConf) *kafka.KafkaWriter {
	fun := "InstanceManager.getKafkaWriter -->"

	in := m.get(ctx, conf)
	if in == nil {
		return nil
	}

	writer, ok := in.(*kafka.KafkaWriter)
	if ok == false {
		xlog.Errorf(ctx, "%s in.(Writer) err, topic: %s", fun, conf.topic)
		return nil
	}

	return writer
}

func (m *InstanceManager) getPulsarProducer(ctx context.Context, conf *instanceConf) *pulsar.Producer {
	fun := "InstanceManager.getKafkaWriter -->"

	in := m.get(ctx, conf)
	if in == nil {
		return nil
	}

	producer, ok := in.(*pulsar.Producer)
	if ok == false {
		xlog.Errorf(ctx, "%s in.(*pulsar.Producer) err, topic: %s", fun, conf.topic)
		return nil
	}

	return producer
}

func (m *InstanceManager) getPulsarConsumer(ctx context.Context, conf *instanceConf) *pulsar.Consumer {
	fun := "InstanceManager.getPulsarConsumer -->"

	in := m.get(ctx, conf)
	if in == nil {
		return nil
	}

	consumer, ok := in.(*pulsar.Consumer)
	if ok == false {
		xlog.Errorf(ctx, "%s in.(*pulsar.Consumer) err, topic: %s", fun, conf.topic)
		return nil
	}

	return consumer
}

func (m *InstanceManager) Close() {
	fun := "InstanceManager.Close -->"

	ctx := context.TODO()

	m.instances.Range(func(key, value interface{}) bool {
		xlog.Infof(ctx, "%s key:%v", fun, key)

		skey, ok := key.(string)
		if ok == false {
			xlog.Errorf(ctx, "%s key:%v", fun, key)
			return false
		}

		conf, err := m.confFromKey(skey)
		if err != nil {
			xlog.Errorf(ctx, "%s key:%v, err:%s", fun, key, err)
			return false
		}

		err = m.closeInstance(ctx, value, conf)
		if err != nil {
			xlog.Errorf(ctx, "%s close instance err:%v", fun, err)
		}

		m.instances.Delete(key)
		return true
	})
}

func (m *InstanceManager) closeInstance(ctx context.Context, instance interface{}, conf *instanceConf) error {
	fun := "InstanceManager.closeInstance-->"
	if conf.role == RoleTypeKafkaReader {
		reader, ok := instance.(*kafka.KafkaReader)
		if !ok {
			return fmt.Errorf("%s instance:%#v should be reader", fun, instance)
		}
		return reader.Close()
	}

	if conf.role == RoleTypeKafkaWriter {
		writer, ok := instance.(*kafka.KafkaWriter)
		if !ok {
			return fmt.Errorf("%s instance:%#v should be writer", fun, instance)
		}
		return writer.Close()
	}

	if conf.role == RoleTypePulsarProducer {
		producer, ok := instance.(*pulsar.Producer)
		if !ok {
			return fmt.Errorf("%s instance:%#v should be producer", fun, instance)
		}
		return producer.Close()
	}

	if conf.role == RoleTypePulsarConsumer {
		consumer, ok := instance.(*pulsar.Consumer)
		if !ok {
			return fmt.Errorf("%s instance:%#v should be consumer", fun, instance)
		}
		return consumer.Close()
	}

	return nil
}
